apollo客户端通知原理

您所在的位置:网站首页 apollo实时更新配置 原理 apollo客户端通知原理

apollo客户端通知原理

2023-08-09 20:10| 来源: 网络整理| 查看: 265

微信公众号:PersistentCoder

一、使用场景

   Apollo是携程开源的一个分布式配置中心,提供了丰富的能力,其中就包括配置发布动态通知。 image.png

   动态通知有很多应用场景,其目的就是将配置的更新实时同步到应用内存粒度,比如:

动态规则维护 黑白名单 半自动化刷新缓存 二、使用

   本篇文章主要围绕半自动化刷新缓存展开。在电商环境,分为商家B端和客户C端,商家在平台或者ERP更新或者发布一些配置变更需要同步到C端让用户感知到最新的内容。    首先考虑到B端的配置变更频率不会太频繁,所以C端会做缓存,那么如果B端发生变更如何通知到C端刷新缓存拉取最新配置内容,有两种实现方式:

B端配置变更后发布消息,C端监听变更消息,然后自动失效缓存 B端配置变更后,手动通知C端,然后失效缓存,也就是半自动化刷新

   自动失效缓存不展开分析,半自动化刷新实现也很简单,基于Apollo的客户端通知机制就可以实现,在配置中心发布变更主体,然后在应用层监听变更内容并做出响应操作即可。

1.Apollo新增配置

   在配置平台新增业务相关的key-value:

apollo.xxx.config_refresh = {"buzzId":"xxx","platform":1,"version":3} 复制代码

   包含业务主体信息,以及版本字段(用于处理配置无变更问题)。

2.编写事件监听

   使用ApolloConfigChangeListener注解标注处理对应key内容变更的方法。

@ApolloConfigChangeListener(interestedKeys = {APOLLO_XXX_CONFIG_REFRESH}) public void onChange(ConfigChangeEvent changeEvent) { Set changedKeys = changeEvent.changedKeys(); if(CollectionUtils.isEmpty(changedKeys)) { log.warn("onChange nothing change;changeKeys={}",changedKeys); return; } if(!changedKeys.contains(APOLLO_XXX_CONFIG_REFRESH)) { log.warn(".onChange change event not contains config;changeKeys={}",changedKeys); return; } log.info("onChange config change;start reinitialize ..........."); ConfigChange change = changeEvent.getChange(APOLLO_XXX_CONFIG_REFRESH); String oldVal = change.getOldValue(); String newVal = change.getNewValue(); log.info("onChange;change '{}' from oldVal:{} to newValue:{}",APOLLO_XXX_CONFIG_REFRESH,oldVal,newVal); if(!this.isJson(newVal)) { log.info("onChange not valid json;newVal={}",newVal); return; } JSONObject json = JSON.parseObject(newVal); String buzzId = null; if(null == (buzzId = json.getString(BUZZ_ID_KEY))) { return; } Integer platform = json.getInteger(PLATFORM_KEY); Integer version = json.getInteger(VERSION_KEY); //手动让缓存失效 try { this.doExpireCache(buzzId,platform,version) } catch (Exception e) { log.error("onChange refresh config cache occur error;buzzId={},platform={},version={}",buzzId,platform,version,e); } } 复制代码

   这样在发生B端配置变更的时候,在配置平台发布对应key-value,然后C端应用接收到变更内容,就会做出相应处理,将缓存清掉。

三、原理&源码分析

   从前边的案例可以看出,核心能力支撑就是Apollo的客户端通知,那么我们就来分析一下Apollo客户端通知能力的实现原理。 Apollo客户端通知的实现,我分为三个维度分析,分别是配置变更监听器准备、变更通知准备、变更通知执行。

1.配置变更监听器准备

   在不接入其他中间件封装的情况下,使用的入口是EnableApolloConfig注解,我们从该注解着手分析。

@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(ApolloConfigRegistrar.class) public @interface EnableApolloConfig { String[] value() default {ConfigConsts.NAMESPACE_APPLICATION}; int order() default Ordered.LOWEST_PRECEDENCE; } 复制代码

   该注解导入并激活ApolloConfigRegistrar类。

public class ApolloConfigRegistrar implements ImportBeanDefinitionRegistrar { private ApolloConfigRegistrarHelper helper = ServiceBootstrap.loadPrimary(ApolloConfigRegistrarHelper.class); @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { helper.registerBeanDefinitions(importingClassMetadata, registry); } } 复制代码

   ApolloConfigRegistrar是一个ImportBeanDefinitionRegistrar,其原理和调用时机可参考《ImportBeanDefinitionRegistrar原理》,通过java spi机制加载ApolloConfigRegistrarHelper实现类DefaultApolloConfigRegistrarHelper的实例。 image.png

com.ctrip.framework.apollo.spring.spi.DefaultApolloConfigRegistrarHelper 复制代码

   然后调用registerBeanDefinitions方法注册BeanDefinition:

public class DefaultApolloConfigRegistrarHelper implements ApolloConfigRegistrarHelper { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { AnnotationAttributes attributes = AnnotationAttributes .fromMap(importingClassMetadata.getAnnotationAttributes(EnableApolloConfig.class.getName())); String[] namespaces = attributes.getStringArray("value"); int order = attributes.getNumber("order"); PropertySourcesProcessor.addNamespaces(Lists.newArrayList(namespaces), order); Map propertySourcesPlaceholderPropertyValues = new HashMap(); // to make sure the default PropertySourcesPlaceholderConfigurer's priority is higher than PropertyPlaceholderConfigurer propertySourcesPlaceholderPropertyValues.put("order", 0); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesPlaceholderConfigurer.class.getName(), PropertySourcesPlaceholderConfigurer.class, propertySourcesPlaceholderPropertyValues); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, PropertySourcesProcessor.class.getName(), PropertySourcesProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, ApolloAnnotationProcessor.class.getName(), ApolloAnnotationProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, SpringValueProcessor.class.getName(), SpringValueProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, SpringValueDefinitionProcessor.class.getName(), SpringValueDefinitionProcessor.class); BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, ApolloJsonValueProcessor.class.getName(), ApolloJsonValueProcessor.class); } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } } 复制代码

   其中有一行注册ApolloAnnotationProcessor类定义,我们看一下ApolloAnnotationProcessor是何方神圣。 ApolloAnnotationProcessor.png    它是一个BeanPostProcessor,父类ApolloProcessor重写了postProcessBeforeInitialization方法:

@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { Class clazz = bean.getClass(); for (Field field : findAllField(clazz)) { processField(bean, beanName, field); } for (Method method : findAllMethod(clazz)) { processMethod(bean, beanName, method); } return bean; } 复制代码

   该方法在Bean实例化之后初始化之前执行,扫描目标类的所有属性和方法然后执行逻辑,我们重点看processMethod方法,看一下ApolloAnnotationProcessor实现:

@Override protected void processMethod(final Object bean, String beanName, final Method method) { ApolloConfigChangeListener annotation = AnnotationUtils .findAnnotation(method, ApolloConfigChangeListener.class); if (annotation == null) { return; } Class[] parameterTypes = method.getParameterTypes(); ReflectionUtils.makeAccessible(method); String[] namespaces = annotation.value(); String[] annotatedInterestedKeys = annotation.interestedKeys(); String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes(); ConfigChangeListener configChangeListener = new ConfigChangeListener() { @Override public void onChange(ConfigChangeEvent changeEvent) { ReflectionUtils.invokeMethod(method, bean, changeEvent); } }; Set interestedKeys = annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null; Set interestedKeyPrefixes = annotatedInterestedKeyPrefixes.length > 0 ? Sets.newHashSet(annotatedInterestedKeyPrefixes) : null; for (String namespace : namespaces) { Config config = ConfigService.getConfig(namespace); if (interestedKeys == null && interestedKeyPrefixes == null) { config.addChangeListener(configChangeListener); } else { config.addChangeListener(configChangeListener, interestedKeys, interestedKeyPrefixes); } } } 复制代码

   将ApolloConfigChangeListener标注的方法包装成ConfigChangeListener然后注册到对应namespace的Config中。    注册流程如下: Apollo监听注册-.png

2.变更通知准备

   前边分析了将客户端的通知变更逻辑封装成了监听器注册备用,那么谁去触发监听器的逻辑呢? 接下来我们分析下如何将变更和通知逻辑关联起来。 wecom-temp-516abc69bc683466eceafe7ddb43c9c2.png    apollo-client包中spring.factories定义了ApolloApplicationContextInitializer类型ApplicationContextInitializer,而ApplicationContextInitializer会在应用启动时加载:

public SpringApplication(ResourceLoader resourceLoader, Class... primarySources) { //... setInitializers((Collection) getSpringFactoriesInstances(ApplicationContextInitializer.class)); //*** } 复制代码

   并且会在容器创建之后刷新之前执行ApplicationContextInitializer的initialize方法。

private void prepareContext(DefaultBootstrapContext bootstrapContext, ConfigurableApplicationContext context, ConfigurableEnvironment environment, SpringApplicationRunListeners listeners, ApplicationArguments applicationArguments, Banner printedBanner) { //*** applyInitializers(context); //*** } protected void applyInitializers(ConfigurableApplicationContext context) { for (ApplicationContextInitializer initializer : getInitializers()) { Class requiredType = GenericTypeResolver.resolveTypeArgument(initializer.getClass(), ApplicationContextInitializer.class); Assert.isInstanceOf(requiredType, context, "Unable to call initializer."); initializer.initialize(context); } } 复制代码

   所以在应用启动的时候,ApolloApplicationContextInitializer的initialize会被调用到。

@Override public void initialize(ConfigurableApplicationContext context) { ConfigurableEnvironment environment = context.getEnvironment(); if (!environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false)) { logger.debug("Apollo bootstrap config is not enabled for context {}, see property: ${{}}", context, PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED); return; } logger.debug("Apollo bootstrap config is enabled for context {}", context); initialize(environment); } 复制代码

   调用内部initialize方法进行初始化操作:

protected void initialize(ConfigurableEnvironment environment) { if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) { return; } String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION); logger.debug("Apollo bootstrap namespaces: {}", namespaces); List namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces); CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME); for (String namespace : namespaceList) { Config config = ConfigService.getConfig(namespace); composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config)); } environment.getPropertySources().addFirst(composite); } 复制代码

   调用ConfigService#getConfig获取每个namespace的配置信息,具体会委托给DefaultConfigManager的实现:

public Config getConfig(String namespace) { Config config = m_configs.get(namespace); if (config == null) { synchronized (this) { config = m_configs.get(namespace); if (config == null) { ConfigFactory factory = m_factoryManager.getFactory(namespace); config = factory.create(namespace); m_configs.put(namespace, config); } } } return config; } 复制代码

   由于系统刚启动,Config还没被缓存,所以会通过调用ConfigFactory的create方法创建Config.

public Config create(String namespace) { ConfigFileFormat format = determineFileFormat(namespace); if (ConfigFileFormat.isPropertiesCompatible(format)) { return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format)); } return new DefaultConfig(namespace, createLocalConfigRepository(namespace)); } 复制代码

   然后会调用到RemoteConfigRepository的构造方法:

public RemoteConfigRepository(String namespace) { m_namespace = namespace; m_configCache = new AtomicReference(); m_configUtil = ApolloInjector.getInstance(ConfigUtil.class); m_httpUtil = ApolloInjector.getInstance(HttpUtil.class); m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class); remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class); m_longPollServiceDto = new AtomicReference(); m_remoteMessages = new AtomicReference(); m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS()); m_configNeedForceRefresh = new AtomicBoolean(true); m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(), m_configUtil.getOnErrorRetryInterval() * 8); gson = new Gson(); this.trySync(); this.schedulePeriodicRefresh(); this.scheduleLongPollingRefresh(); } 复制代码

   里边调用了三个方法,首次同步、定时刷新和长轮询刷新。 先看一下首次同步:

@Override protected synchronized void sync() { Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig"); try { ApolloConfig previous = m_configCache.get(); ApolloConfig current = loadApolloConfig(); //reference equals means HTTP 304 if (previous != current) { logger.debug("Remote Config refreshed!"); m_configCache.set(current); this.fireRepositoryChange(m_namespace, this.getConfig()); } if (current != null) { Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()), current.getReleaseKey()); } transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { transaction.setStatus(ex); throw ex; } finally { transaction.complete(); } } 复制代码

   将本地缓存和远程加载的数据进行对比,如果不一致,用远程覆盖本地,然后触发变更事件fireRepositoryChange:

protected void fireRepositoryChange(String namespace, Properties newProperties) { for (RepositoryChangeListener listener : m_listeners) { try { listener.onRepositoryChange(namespace, newProperties); } catch (Throwable ex) { Tracer.logError(ex); logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex); } } } 复制代码

   然后会调用触发配置变更,调用ConfigChangeListener的逻辑:

protected void fireConfigChange(final ConfigChangeEvent changeEvent) { for (final ConfigChangeListener listener : m_listeners) { // check whether the listener is interested in this change event if (!isConfigChangeListenerInterested(listener, changeEvent)) { continue; } m_executorService.submit(new Runnable() { @Override public void run() { String listenerName = listener.getClass().getName(); Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName); try { listener.onChange(changeEvent); transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { transaction.setStatus(ex); Tracer.logError(ex); logger.error("Failed to invoke config change listener {}", listenerName, ex); } finally { transaction.complete(); } } }); } } 复制代码

   对于定时刷新和长轮询刷新这两个功能在 apollo 的 github 文档中有介绍:

客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。(通过Http Long Polling实现) 客户端还会定时从Apollo配置中心服务端拉取应用的最新配置。 这是一个fallback机制,为了防止推送机制失效导致配置不更新 客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回304 - Not Modified 定时频率默认为每5分钟拉取一次,客户端也可以通过在运行时指定System Property: apollo.refreshInterval来覆盖,单位为分钟。 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中 客户端会把从服务端获取到的配置在本地文件系统缓存一份 在遇到服务不可用,或网络不通的时候,依然能从本地恢复配置 应用程序可以从Apollo客户端获取最新的配置、订阅配置更新通知

   长连接是更新配置的主要手段,定时刷新是辅助手段,避免长轮训失败造成数据更新丢失。    看一下定时刷新实现:

private void schedulePeriodicRefresh() { logger.debug("Schedule periodic refresh with interval: {} {}", m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit()); m_executorService.scheduleAtFixedRate( new Runnable() { @Override public void run() { Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace)); logger.debug("refresh config for namespace: {}", m_namespace); trySync(); Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION); } }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit()); } 复制代码

   定时调用trySync方法实现数据同步,然后触发ConfigChangeListener逻辑。 然后看一下长轮询实现:

private void scheduleLongPollingRefresh() { remoteConfigLongPollService.submit(m_namespace, this); } 复制代码

   调用startLongPolling方法开启长轮询:

private void startLongPolling() { if (!m_longPollStarted.compareAndSet(false, true)) { //already started return; } try { final String appId = m_configUtil.getAppId(); final String cluster = m_configUtil.getCluster(); final String dataCenter = m_configUtil.getDataCenter(); final String secret = m_configUtil.getAccessKeySecret(); final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills(); m_longPollingService.submit(new Runnable() { @Override public void run() { if (longPollingInitialDelayInMills > 0) { try { logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills); TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills); } catch (InterruptedException e) { //ignore } } doLongPollingRefresh(appId, cluster, dataCenter, secret); } }); } catch (Throwable ex) { m_longPollStarted.set(false); ApolloConfigException exception = new ApolloConfigException("Schedule long polling refresh failed", ex); Tracer.logError(exception); logger.warn(ExceptionUtil.getDetailMessage(exception)); } } 复制代码

   调用doLongPollingRefresh方法执行长轮询刷新逻辑:

private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) { final Random random = new Random(); ServiceDTO lastServiceDto = null; while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) { if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { } } Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification"); String url = null; try { if (lastServiceDto == null) { List configServices = getConfigServices(); lastServiceDto = configServices.get(random.nextInt(configServices.size())); } url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter, m_notifications); HttpRequest request = new HttpRequest(url); request.setReadTimeout(LONG_POLLING_READ_TIMEOUT); if (!StringUtils.isBlank(secret)) { Map headers = Signature.buildHttpHeaders(url, appId, secret); request.setHeaders(headers); } transaction.addData("Url", url); final HttpResponse response = m_httpUtil.doGet(request, m_responseType); if (response.getStatusCode() == 200 && response.getBody() != null) { updateNotifications(response.getBody()); updateRemoteNotifications(response.getBody()); transaction.addData("Result", response.getBody().toString()); notify(lastServiceDto, response.getBody()); } if (response.getStatusCode() == 304 && random.nextBoolean()) { lastServiceDto = null; } m_longPollFailSchedulePolicyInSecond.success(); transaction.addData("StatusCode", response.getStatusCode()); transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { try { TimeUnit.SECONDS.sleep(sleepTimeInSecond); } catch (InterruptedException ie) { } } finally { transaction.complete(); } } } 复制代码

   每5s钟主动从Apollo Server拉取数据,如果请求成功,通知RemoteConfigRepository:

public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) { m_longPollServiceDto.set(longPollNotifiedServiceDto); m_remoteMessages.set(remoteMessages); m_executorService.submit(new Runnable() { @Override public void run() { m_configNeedForceRefresh.set(true); trySync(); } }); } 复制代码

   和定时刷新一样,也调用到了trySync逻辑,最后触发注册到对应namespace的Config上的ConfigChangeListener逻辑。 到这里就完成了变更通知的准备工作,流程大致如下:

3.变更通知执行

   用户更新配置时,客户端如何监听到变更事件并做出响应处理呢? 基于前一小结,如果用户发布了属性变更,RemoteConfigRepository的定时刷新或长轮询逻辑会从Apollo Server拉取最新数据到本地,然后和本地缓存(上一个版本数据)做对比,如果发现不一样则触发配置变更,调用ConfigChangeListener逻辑。 Apollo事件通知-Apollo.png

四、相关实现 1.redis事件通知

   比如我们要监听redis中的key失效事件,本地做一些定制化逻辑,那么就需要开启redis事件通知能力,然后本地做实现KeyExpirationEventMessageListener接口:

@Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } /** * 针对redis数据失效事件,进行逻辑处理 * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern) { String expiredKey = message.toString(); // 失效逻辑 } } } 复制代码

   redis中的key失效时会触发KeyExpirationEventMessageListener的onMessage,这样就实现了redis客户端的事件通知。

2.zookeeper watcher机制

   在使用zk做做注册中心或者分布式锁场景,我们需要监听zk的节点变更事件,比如节点被删除,那么客户端就需要监听该事件,然后本地做一些逻辑处理。

public class WatcherDemo implements Watcher{ public void process(WatchedEvent event) { //do something } } 复制代码

   节点变更事件类型有NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged和None,对于注册中心场景,服务消费者监听到服务节点被删除,那么可以在本地剔除远程服务节点。

五、为什么使用长轮询

   关于为什么使用 HTTP 长轮训,估计接触 Apollo 的人看到客户端通知实现方式时都会疑惑,为什么使用这种方式,而不是其他方式? 在网上找到了Apollo作者对该问题的解答 image.png

为什么不使用消息系统?太复杂,杀鸡用牛刀。 为什么不用 TCP 长连接?对网络环境要求高,容易推送失败。且有双写问题。 为什么使用 HTTP 长轮询?性能足够,结合 Servlet3 的异步特性,能够维持万级连接(一个客户端只有一个长连接)。直接使用 Servlet 的 HTTP 协议,比单独用 TCP 连接方便。HTTP 请求/响应模式,保证了不会出现双写的情况。最主要还是简单,性能暂时不是瓶颈。


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3